// This example shows how to create an OPC Data Access event source, and query it for events carrying even data value.
using System;
using System.Diagnostics;
using System.Reactive;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using OpcLabs.EasyOpc.DataAccess.ComplexEventProcessing;
using OpcLabs.EasyOpc.DataAccess.Reactive;
namespace SimpleDAStreamInsightApplication
{
class Program
{
static void Main()
{
// Create an embedded StreamInsight server
//using (var server = Server.Create("Default"))
using (var server = Server.Create("Instance1"))
{
Debug.Assert(server != null);
// Create a local end point for the server embedded in this program
var managementService = server.CreateManagementService();
Debug.Assert(managementService != null);
var host = new ServiceHost(managementService);
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message),
"http://localhost/MyStreamInsightServer");
host.Open();
/* The following entities will be defined and available in the server for other clients:
* serverApp
* serverSource
* serverSink
* serverProcess
*/
// CREATE a StreamInsight APPLICATION in the server
var myApp = server.CreateApplication("serverApp");
// DEFINE a simple SOURCE (returns a point event every second)
const string machineName = "";
const string serverClass = "OPCLabs.KitServer.2";
const string itemId = "Simulation.Incrementing (1 s)";
var observable = DAItemChangedObservable.Create<int>(machineName, serverClass, itemId, 100);
var mySource = myApp
.DefineObservable(() => observable)
.ToPointStreamable(
eventArgs =>
PointEvent.CreateInsert(DateTimeOffset.Now,
(DAItemChangedPayload<int>)eventArgs),
AdvanceTimeSettings.StrictlyIncreasingStartTime);
// DEPLOY the source to the server for clients to use
mySource.Deploy("serverSource");
// Compose a QUERY over the source (return every event carrying even data value)
var myQuery = from e in mySource
where e.VtqPayload.Value % 2 == 0
select e;
// DEFINE a simple observer SINK (writes the value to the server console)
var mySink = myApp.DefineObserver(() => Observer.Create<DAItemChangedPayload<int>>(
payload => Console.WriteLine("sink_Server..: {0}", payload)));
// DEPLOY the sink to the server for clients to use
mySink.Deploy("serverSink");
// BIND the query to the sink and RUN it
var binding = myQuery.Bind(mySink);
Debug.Assert(binding != null);
using (/*var proc = */binding.Run("serverProcess"))
{
// Wait for the user stops the server
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
Console.WriteLine("----------------------------------------------------------------");
Console.WriteLine(" ");
Console.ReadLine();
}
host.Close();
}
}
}
}